This notebook contains code for our implementation of the Deep Q Network (DQN) and Genetic Algorithm (GA) methods, as well as video clips of our best trained models.¶

Note: Our models take too long to train and run full games so we are just including our functions but not calling them.

We did not include our Proximal Policy Optimization (PPO) model because it didn't work but its code in /src

Deep Q Network¶

Dependencies¶

In [24]:
from random import random, randint, sample
import os
import sys
import numpy as np
import torch
import torch.nn as nn
import cv2
from collections import deque

current_folder = os.getcwd()
tetris_folder = os.path.join(current_folder, 'src','dqn')
sys.path.append(tetris_folder)
from modified_tetris import Tetris

Neural Network¶

In [25]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()

        self.conv1 = nn.Sequential(nn.Linear(4, 64), nn.ReLU(inplace=True))
        self.conv2 = nn.Sequential(nn.Linear(64, 64), nn.ReLU(inplace=True))
        self.conv3 = nn.Sequential(nn.Linear(64, 1))

        self._create_weights()

    def _create_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)

        return x

Training (not called)¶

In [26]:
WIDTH = 10  # Width of board
HEIGHT = 20  # Height of board
BLOCK_SIZE = 30  # Block size when rendering
BATCH_SIZE = 512  # High batch size
LEARNING_RATE = 1e-3
GAMMA = 0.99
INITIAL_EPSILON = 1.0
FINAL_EPSILON = 1e-3
NUM_DECAY_EPOCHS = 1800
NUM_EPOCHS = 3000
SAVE_INTERVAL = 50
REPLAY_MEMORY_SIZE = 28000

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

import matplotlib.pyplot as plt

def train():
    torch.manual_seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(42)

    env = Tetris(width=WIDTH, height=HEIGHT, block_size=BLOCK_SIZE)
    model = DQN().to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.MSELoss()

    state = env.reset().to(DEVICE)
    replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
    epoch = 0

    # Data for plotting
    epoch_scores = []

    while epoch < NUM_EPOCHS:
        next_steps = env.get_next_states()
        # Epsilon Greedy
        epsilon = FINAL_EPSILON + (max(NUM_DECAY_EPOCHS - epoch, 0) * 
                                   (INITIAL_EPSILON - FINAL_EPSILON) / NUM_DECAY_EPOCHS)
        next_actions, next_states = zip(*next_steps.items())
        next_states = torch.stack(next_states).to(DEVICE)

        model.eval()
        with torch.no_grad():
            predictions = model(next_states)[:, 0]
        model.train()

        if random() <= epsilon:
            index = randint(0, len(next_steps) - 1)
        else:
            index = torch.argmax(predictions).item()

        next_state = next_states[index, :]
        action = next_actions[index]

        reward, done = env.step(action, render=False)
        next_state = next_state.to(DEVICE)
        replay_memory.append([state, reward, next_state, done])

        if done:
            final_score = env.score
            final_tetrominoes = env.tetrominoes
            final_cleared_lines = env.cleared_lines
            state = env.reset().to(DEVICE)

            # Store score for plotting
            epoch_scores.append(final_score)
        else:
            state = next_state
            continue

        if len(replay_memory) < REPLAY_MEMORY_SIZE / 10:
            continue

        epoch += 1
        batch = sample(replay_memory, min(len(replay_memory), BATCH_SIZE))
        state_batch, reward_batch, next_state_batch, done_batch = zip(*batch)
        state_batch = torch.stack(tuple(state for state in state_batch)).to(DEVICE)
        reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None]).to(DEVICE)
        next_state_batch = torch.stack(tuple(state for state in next_state_batch)).to(DEVICE)

        q_values = model(state_batch)
        model.eval()
        with torch.no_grad():
            next_prediction_batch = model(next_state_batch)
        model.train()

        # Compute the target Q-values for each transition
        y_values = [
            reward if done else reward + GAMMA * prediction
            for reward, done, prediction in zip(reward_batch, done_batch, next_prediction_batch)
        ]
        y_tensor = torch.tensor(y_values, dtype=torch.float32, device=DEVICE)
        y_batch = y_tensor[:, None]

        optimizer.zero_grad()
        loss = criterion(q_values, y_batch)
        loss.backward()
        optimizer.step()

        print("Epoch: {}/{}, Action: {}, Score: {}, Tetrominoes {}, Cleared lines: {}".format(
            epoch,
            NUM_EPOCHS,
            action,
            final_score,
            final_tetrominoes,
            final_cleared_lines))

        if epoch > 0 and epoch % SAVE_INTERVAL == 0:
            torch.save(model.state_dict(), "saved_model.pth")

    torch.save(model.state_dict(), "saved_model.pth")

    # Plotting the scores
    plt.figure(figsize=(10, 6))
    plt.plot(range(len(epoch_scores)), epoch_scores, label="Score")
    plt.xlabel("Epochs")
    plt.ylabel("Score")
    plt.title("Score vs. Epochs")
    plt.legend()
    plt.grid()
    plt.show()

    return model

Evaluation (not called)¶

In [27]:
FPS = 300
def evaluate_model(model, num_games=10):
    model.eval()
    env = Tetris(width=10, height=20, block_size=30)

    total_score = 0
    total_tetrominoes = 0
    total_lines_cleared = 0

    for game in range(num_games):
        _ = env.reset().to(DEVICE)
        game_score = 0
        game_tetrominoes = 0
        game_lines_cleared = 0

        while True:
            next_steps = env.get_next_states()
            next_actions, next_states = zip(*next_steps.items())
            next_states = torch.stack(next_states).to(DEVICE)

            with torch.no_grad():
                predictions = model(next_states)[:, 0]
            best_action_index = torch.argmax(predictions).item()
            action = next_actions[best_action_index]

            _, done = env.step(action,render=False)

            game_score = env.score
            game_tetrominoes = env.tetrominoes
            game_lines_cleared = env.cleared_lines

            if done:
                break

        # Accumulate totals
        total_score += game_score
        total_tetrominoes += game_tetrominoes
        total_lines_cleared += game_lines_cleared

        print(f"Game {game + 1}/{num_games} - Score: {game_score}, Tetrominoes: {game_tetrominoes}, Lines Cleared: {game_lines_cleared}")

    # Calculate averages
    avg_score = total_score / num_games
    avg_tetrominoes = total_tetrominoes / num_games
    avg_lines_cleared = total_lines_cleared / num_games

    print(f"\nEvaluation Results:")
    print(f"Average Score: {avg_score}")
    print(f"Average Tetrominoes: {avg_tetrominoes}")
    print(f"Average Lines Cleared: {avg_lines_cleared}")

    return avg_score, avg_tetrominoes, avg_lines_cleared
If you want to evaluate our best DQN model, uncomment the last line in the following cell. It will take more than one minute though.¶
In [28]:
current_folder = os.getcwd()
saved_model = os.path.join(current_folder, 'src', 'dqn', 'Trained_Models','adaptation2.pth')

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent_tetris = DQN().to(DEVICE)

# Run line below if using CPU
agent_tetris.load_state_dict(torch.load(saved_model, map_location=torch.device('cpu')))

# Run line below if using GPU
# agent_tetris.load_state_dict(torch.load(saved_model))

#avg_score, avg_tetrominoes, avg_lines_cleared = evaluate_model(agent_tetris)
/var/folders/sf/yp04y91j64g6csfzq6jv0vhh0000gn/T/ipykernel_41740/3215735096.py:8: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
  agent_tetris.load_state_dict(torch.load(saved_model, map_location=torch.device('cpu')))
Out[28]:
<All keys matched successfully>
In [29]:
current_folder = os.getcwd()
from IPython.display import Video

video_path = os.path.join(current_folder, 'src', 'dqn', 'dqn_model.mp4')
Video(video_path, embed=True, width=600)
Out[29]:
Your browser does not support the video tag.

Genetic Algorithm¶

Dependencies¶

In [30]:
import sys
import os
import numpy as np
import random
import cv2

current_folder = os.getcwd()
tetris_folder = os.path.join(current_folder, 'src','genetic_algorithm')
sys.path.append(tetris_folder)

import pickle

from tetris import Tetris

Genetic Agent¶

In [31]:
class GeneticAgent:
    def __init__(self, weights=None):
        # Initialize weights for the features:
        # lines_cleared, holes, bumpiness, height
        if weights is None:
            # Random weights between -1 and 1
            self.weights = np.random.uniform(-1, 1, 4)
        else:
            self.weights = weights

    def get_action(self, env):
        # Get possible next states
        next_states = env.get_next_states()
        best_value = -float('inf')
        best_action = None

        for action, features in next_states.items():
            # Compute value using linear combination of features and weights
            value = np.dot(self.weights, features.numpy())
            if value > best_value:
                best_value = value
                best_action = action

        return best_action

Genetic Algorithm¶

In [32]:
class GeneticAlgorithm:
    def __init__(self, population_size, mutation_rate, crossover_rate, elite_fraction):
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.elite_fraction = elite_fraction
        self.population = [GeneticAgent() for _ in range(population_size)]

    def evaluate_fitness(self, agents, render=False, grid_size=(4, 4)):
        fitness_scores = []
        frames = []
        for idx, agent in enumerate(agents):
            env = Tetris()
            total_score = 0
            done = False
            while not done:
                action = agent.get_action(env)
                reward, done = env.step(action, render=False)
                total_score += reward
                if render:
                    frame = env.render(mode='rgb_array')
                    frames.append((idx, frame))
            fitness_scores.append(total_score)
        if render:
            self.render_games(frames, grid_size)
        return fitness_scores

    def render_games(self, frames, grid_size):
        # Organize frames by agent index
        from collections import defaultdict
        agent_frames = defaultdict(list)
        for idx, frame in frames:
            agent_frames[idx].append(frame)

        max_frames = max(len(f) for f in agent_frames.values())
        for frame_idx in range(max_frames):
            grid_frames = []
            for idx in range(self.population_size):
                if frame_idx < len(agent_frames[idx]):
                    frame = agent_frames[idx][frame_idx]
                else:
                    # If agent finished, show last frame
                    frame = agent_frames[idx][-1]
                grid_frames.append(frame)

            # Create grid image
            grid_img = self.create_grid(grid_frames, grid_size)
            cv2.imshow("Genetic Algorithm Tetris", grid_img)
            key = cv2.waitKey(1)
            if key == 27:  # Esc key to stop
                cv2.destroyAllWindows()
                return

    def create_grid(self, frames, grid_size):
        rows = []
        idx = 0
        frame_height, frame_width, _ = frames[0].shape
        for _ in range(grid_size[0]):
            row_frames = []
            for _ in range(grid_size[1]):
                if idx < len(frames):
                    frame = frames[idx]
                else:
                    # Fill with black images if not enough frames
                    frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
                row_frames.append(frame)
                idx += 1
            row = np.hstack(row_frames)
            rows.append(row)
        grid_img = np.vstack(rows)
        return grid_img

    def select_parents(self, fitness_scores):
        # Normalize fitness scores
        total_fitness = sum(fitness_scores)
        if total_fitness == 0:
            selection_probs = [1 / len(fitness_scores)] * len(fitness_scores)
        else:
            selection_probs = [f / total_fitness for f in fitness_scores]
        parents = np.random.choice(self.population, size=2, p=selection_probs)
        return parents

    def crossover(self, parent1, parent2):
        if random.random() < self.crossover_rate:
            # Single-point crossover
            crossover_point = random.randint(1, len(parent1.weights) - 1)
            child_weights = np.concatenate(
                (parent1.weights[:crossover_point], parent2.weights[crossover_point:])
            )
            return GeneticAgent(weights=child_weights)
        else:
            return GeneticAgent(weights=parent1.weights.copy())

    def mutate(self, agent):
        for i in range(len(agent.weights)):
            if random.random() < self.mutation_rate:
                # Mutate weight with Gaussian noise
                agent.weights[i] += np.random.normal()
        return agent

    def evolve(self, render=False, grid_size=(4, 4)):
        # Evaluate fitness
        fitness_scores = self.evaluate_fitness(self.population, render=render, grid_size=grid_size)
        new_population = []

        # Elitism: keep the top-performing agents
        elite_size = int(self.elite_fraction * self.population_size)
        elite_indices = np.argsort(fitness_scores)[-elite_size:]
        elite_agents = [self.population[i] for i in elite_indices]
        new_population.extend(elite_agents)

        # Generate the rest of the new population
        while len(new_population) < self.population_size:
            parent1, parent2 = self.select_parents(fitness_scores)
            child = self.crossover(parent1, parent2)
            child = self.mutate(child)
            new_population.append(child)

        self.population = new_population
        max_fitness = max(fitness_scores)
        avg_fitness = sum(fitness_scores) / len(fitness_scores)
        min_fitness = min(fitness_scores)
        return max_fitness, avg_fitness, min_fitness

Training¶

Here we are going to use the genetic algorithm above to implement a 2 generation training session using the same hyperparameters that were used for our best run. This will pop up a Genetic Algorithm Training window, rendered to show how the gnerations and population works. If you get lucky in the first or second generation, please stop the runtime of the code below or it will go for a while.

In [33]:
def train_snippet():
    generations = 2
    population_size = 4
    mutation_rate = 0.1
    crossover_rate = 0.7
    elite_fraction = 0.2

    ga = GeneticAlgorithm(population_size, mutation_rate, crossover_rate, elite_fraction)

    print("Starting training snippet with rendering...")
    for generation in range(generations):
        print(f"Generation {generation + 1}")
        ga.evolve(render=True, grid_size=(2, 2))

    print("Training complete.")
In [34]:
train_snippet()
Starting training snippet with rendering...
Generation 1
2024-12-11 21:53:52.429 python[41740:17189686] +[IMKClient subclass]: chose IMKClient_Modern
2024-12-11 21:53:52.429 python[41740:17189686] +[IMKInputSession subclass]: chose IMKInputSession_Modern
Generation 2
Training complete.

Render Best Model Trained¶

This snippet will showcases the end of a random run that we did with our best agent that was trained.

In [35]:
current_folder = os.getcwd()
from IPython.display import Video

video_path = os.path.join(current_folder, 'src', 'genetic_algorithm', 'Results', '30secondClip.mp4')
Video(video_path, embed=True, width=600)
Out[35]:
Your browser does not support the video tag.

Fitness Trends Results¶

In [36]:
from IPython.display import Image
Image(filename="src/genetic_algorithm/Results/fitness_trends.png")
Out[36]: